New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6501: Dynamic broker config tests updates and metrics fix #4539
KAFKA-6501: Dynamic broker config tests updates and metrics fix #4539
Conversation
@hachikuji Can you review, please? Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Left one question and a couple minor comments.
val node = nodeMap.get(listenerName) | ||
warn(s"Broker endpoint not found for broker $brokerId listenerName $listenerName") | ||
node | ||
}.getOrElse(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think you don't need this if you use flatMap
.
val numProcessors = servers.head.config.numNetworkThreads * 2 // 2 listeners | ||
|
||
val kafkaMetrics = servers.head.metrics.metrics().keySet.asScala | ||
.filter(_.tags.containsKey("networkProcessor")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also a response queue size metric which uses the "processor" tag. Maybe we can add a check to ensure its deletion as well?
aliveNodes.get(brokerId).map { nodeMap => | ||
nodeMap.getOrElse(listenerName, | ||
throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To check my understanding, previously when we raised this exception, the Metadata request would have failed with an unknown server error (since this exception has no error code) which would have probably been raised to the user. Is that right? Now we will return LEADER_NOT_AVAILABLE instead and the client will retry.
I am wondering in this case if we really should have a separate error code to indicate that there is no listener provided so that we can at least log a warning in the client. It seems more likely that this is the result of a misconfiguration than a delayed config update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should be careful here. Otherwise we may create a hard to diagnose problem for a common case (misconfigured listener).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji @ijuma Thanks for the reviews. At the moment, as Jason said, user sees an unknown server error which is not retried. Neither client nor broker has any errors in the logs to show what went wrong. I did initially consider adding a new error code for this, but the problem is that old clients wouldn't recognize the error code and I thought they wouldn't retry as a result (may be I am wrong). So I thought LEADER_NOT_AVAILABLE is a reasonable error to send to the client. Since the problem occurs only if some brokers have a listener and others don't, I was thinking it was sufficient to have a log entry in the broker logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rajinisivaram if we bumped the version of the relevant protocols in the 1.1 cycle, we could conditionally return a new error code. And fallback to LEADER_NOT_AVAILABLE
, otherwise. If we didn't bump them, then it's less clear if it's worth doing it just for this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't seem to have bumped up the version of MetadataRequest
or FindCoordinatorRequest
in 1.1 (not sure if there are other requests which use this code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. That's a fair point. Something else to consider is whether we should log the message when we receive the UpdateMetadata request from the controller rather than the Metadata request from the clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Yes, that makes sense, updated. Do we still want to change protocol version and add a new error code for 1.1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji @rajinisivaram How about we add it to the KIP but implement it in the next version? I believe we have KIPs in progress that suggest changing MetadataRequest and we could piggyback on one of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good to me. I think this is still an improvement over existing behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji @ijuma Sounds good. I will update the KIP and create a JIRA for the next version. I think all the other comments on this one have been addressed. Let me know if anything else needs to be done for 1.1. Thanks.
@@ -740,6 +740,7 @@ private[kafka] class Processor(val id: Int, | |||
close(channel.id) | |||
} | |||
selector.close() | |||
removeMetric("IdlePercent", Map("networkProcessor" -> id.toString)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a constant for the metric name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't seem to use constants for other metric names, looks odd to have just for this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any other metrics in this method. Generally, if the same magic value is used in 2 places, we should definitely use a constant. We don't follow this rule consistently, which is a shame.
throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`")) | ||
} | ||
val node = nodeMap.get(listenerName) | ||
warn(s"Broker endpoint not found for broker $brokerId listenerName $listenerName") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, shouldn't this be logged if node
is None
?
c67a5c8
to
3a6ed70
Compare
@rajinisivaram Note that the test failure appears related. |
@hachikuji Yes, thank you, I will fix the test failure and rebase. |
3a6ed70
to
d005c79
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, and thanks for fixing the metric that I broke.
1. Handle listener-not-found in MetadataCache since this can occur when listeners are being updated. To avoid breaking clients, this is handled in the same way as broker-not-available so that clients may retry. 2. Set retries=1000 for listener reconfiguration tests to avoid transient failures when metadata cache has not been updated 3. Remove IdlePercent metric when Processor is deleted, add test 4. Reduce log segment size used during reconfiguration to avoid timeout while waiting for log rolling 5.Test markPartitionsForTruncation after fetcher thread resize 6. Move per-processor ResponseQueueSize metric back to RequestChannel. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
5.Test markPartitionsForTruncation after fetcher thread resize
Committer Checklist (excluded from commit message)